-
Notifications
You must be signed in to change notification settings - Fork 6.7k
[core] Implement a thread pool and call the CPython API on all threads within the same concurrency group #52575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
What will the behavior be if a user does try to access thread-local state currently? There are really only two acceptable options here IMO unless there is a significant:
Is there a reason why we can't fix the underlying problem and ensure that the initializer is run once per thread and the same thread runs the corresponding release call? Naiively it seems like this should only require keeping a map of |
As I mentioned above, |
Got it, in that case it sounds like we'll need to use the lower-level APIs to manage our own basic thread pool and implement the init/release logic. This should not be too challenging given how simple the usage in |
I think it's fine not to support thread-local state for concurrency groups with more than one thread. I remember discussing this behavior with @stephanie-wang several months ago when we were trying to move the RayCG execution loop to the main thread. However, executing the initializer is not only for thread-local state; it also aligns Ray more closely with the Python interpreter's assumptions. That is, once a thread with a given thread ID exits, it cannot be restarted. If we want to run the initializer/releaser on each thread, we may need to get rid of the |
I just saw #52575 (comment) after I submitted #52575 (comment). Implementing our own thread pool makes sense. I want to confirm with you that the goal of implementing our own thread pool to initialize and release Python threads is not to support thread-local state; rather, it is to fulfill the Python interpreter's assumptions, as I mentioned in the previous comment. Users should still not use thread-local state for a concurrency group with multiple threads because of the user interface issue mentioned in #52575 (comment). |
Yes exactly. I agree we should not encourage users to do this, but we should fulfill the Python interpreter's assumptions. This will also avoid undefined behavior and/or scary stack traces like the one in this ticket. As an example, there might be library code that uses thread local storage that users aren't even aware of. We would want to make sure that the code at least runs correctly and doesn't fail in unexpected & confusing ways. |
@edoakes Is it okay to implement our own thread pool using a naive round-robin approach? If not, I’d prefer to merge this PR first, and then I can follow up with another PR to implement it after on-call. I took a look at the source code of the post function in boost::asio::thread_pool. It's not trivial if we plan to implement the scheduler by ourselves. |
The use case here is quite simple and the work is coarse-grained (task executions). We should be able to use an Psuedocode:
|
@@ -236,6 +240,39 @@ def test_tasks_on_different_executors(self, ray_start_regular_shared): | |||
assert value == "f2" | |||
|
|||
|
|||
def test_multiple_threads_in_same_group(ray_start_regular_shared): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
We can't use
threading.enumerate()
to check the number of threads because the threads are not launched by Python. -
The threads are visible for
py-spy
because it checks the information from OS.
|
||
/// Join the thread pool. | ||
void BoundedExecutor::Join() { pool_->join(); } | ||
void BoundedExecutor::Join() { | ||
work_guard_.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maintain the previous behavior. We can’t assume that Join
will always be called after Stop
; therefore, we need to reset work_guard_
here.
It's fine to call work_guard_.reset()
twice. The second one will not do anything.
@edoakes CI tests pass! |
releaser(); | ||
} | ||
}); | ||
init_future.wait(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think instead of having this future that waits for each thread in sequence, you should kick off all threads and have a latch that the constructor will wait on before exiting
https://en.cppreference.com/w/cpp/thread/latch
in boost for c++17
https://www.boost.org/doc/libs/1_88_0/boost/thread/latch.hpp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could also avoid implementing our own threadpool and keep using boost::threadpool with this. Having each thread run an io_context when it actually doesn't have to do any io means we're wasting some time doing epoll, etc. when it doesn't have to
- have a latch/barrier that waits for all of the init functions to start running (because the thread is blocked on the latch you can guarantee that each of the inits will be posted to its own thread)
- have a latch/barrier that waits for all of the init functions to finish
at the end
- call wait first to make sure all threads are idle, post releaser functions that wait on a latch/barrier again, and then join and stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think instead of having this future that waits for each thread in sequence, you should kick off all threads and have a latch that the constructor will wait on before exiting
https://en.cppreference.com/w/cpp/thread/latch
in boost for c++17
https://www.boost.org/doc/libs/1_88_0/boost/thread/latch.hpp
Thanks! Happy to learn new C++ techniques! I’ll take a look. I’m not sure how much benefit running all threads in parallel will provide in Python if the initializer needs to acquire the GIL.
we could also avoid implementing our own threadpool and keep using boost::threadpool with this. Having each thread run an io_context when it actually doesn't have to do any io means we're wasting some time doing epoll, etc. when it doesn't have to
This seems hacky to me. Is this a common usage pattern for boost::asio::thread_pool
?
"because the thread is blocked on the latch you can guarantee that each of the inits will be posted to its own thread" => We rely on a behavior that boost::asio
neither guarantees nor documents. thread_pool::post()
doesn’t guarantee which thread will execute a given event.
post releaser functions that wait on a latch/barrier again, and then join and stop
We need to ensure that the releasers
run on the same threads as the initializers
that create them. I am not sure whether there is an easy way to do that or not.
It’s a bit over-engineered and may cause potential issues in my opinion. If we don’t have strong evidence, I’d prefer to keep the current implementation. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure how common of a usage pattern it is. We can ensure initializer / releaser pairs with thread_id -> releaser map or something.
I'm ok with keeping the current simpler implementation. i doubt the overhead of io_context really matters for us. Another option is just implementing a lightweight threadpool with condition variables and a task queue if working around asio::threadpool with waits and latches and thread ids is rough.
Leaving decision to @edoakes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving decision to
Which decision are you referring to:
(1) The current implementation versus a “lightweight thread pool with condition variables and a task queue,” or
(2) The current implementation versus the thread-pool solution?
For (1), what benefits does the condition-variable solution offer? We can discuss whether it’s worth it or not. For (2), if there’s no strong evidence of benefits or a common usage pattern, it’s a strong no from my perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per offline discussion, we'll go with the existing implementation in the PR (basic hand-rolled threadpool using io_context). The reasoning is:
- We aren't forcing asio threadpool into a model it isn't built for
- It's very simple, so without justification it's preferred over a more complex implementation that might be more optimal
@kevin85421 to run full suite of benchmarks to validate (2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use latch 1130e71
In PR #52575, the debug build keeps failing without any error messages. When I change the instance type from medium to large, all CI tests pass. I suspect it may be an OOM issue. Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
I did a benchmark for the io_context implementation and the condition variable implementation.
4 threads and post 1000000 times.
|
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@edoakes there are still some comments that I haven't addressed. I will ping you when all comments are addressed. |
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@edoakes All comments have been addressed. |
Why are these changes needed?
We see the following error message from the CI runs of
test_threaded_actor.py
(example1, example2).The message "Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread" is very scary, but it will not cause any tests to fail.
The root cause is that
PyGILState_Release
is called on a thread that has never calledPyGILState_Ensure
. See the CPython source code for more details.The reason is that we can't control which thread in the thread pool will run the initializer/releaser. Hence, if a concurrency group has more than one thread, the error message above may be printed when we gracefully shut down an actor (i.e.,
ray.actor.exit_actor()
).In this PR, we implement our own thread pool using
std::thread
, ensuring that both the initializer and the releaser run on the same thread. Consequently, from the Python interpreter’s perspective, all Python threads in the same concurrency group remain active even after they finish executing Ray tasks.Related issue number
Closes #51071
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.